// Berkeley Open Infrastructure for Network Computing
// http://boinc.berkeley.edu
// Copyright (C) 2005 University of California
//
// This is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation;
// either version 2.1 of the License, or (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// To view the GNU Lesser General Public License visit
// http://www.gnu.org/copyleft/lesser.html
// or write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

// Abstraction of a set of executing applications,
// connected to I/O files in various ways.
// Shouldn't depend on CLIENT_STATE.

#include "cpp.h"

#ifdef _WIN32
#include "boinc_win.h"
#else 
#include "config.h"
#endif

#ifndef _WIN32

#if HAVE_UNISTD_H
#include <unistd.h>
#endif
#if HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#if HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
#endif
#if HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#if HAVE_FCNTL_H
#include <fcntl.h>
#endif

#include <cctype>
#include <ctime>
#include <cstdio>
#include <cmath>
#include <cstdlib>

#endif

#include "client_state.h"
#include "client_types.h"
#include "error_numbers.h"
#include "filesys.h"
#include "file_names.h"
#include "parse.h"
#include "shmem.h"
#include "util.h"
#include "client_msgs.h"
#include "procinfo.h"
#include "app.h"

using std::max;
using std::min;

ACTIVE_TASK::ACTIVE_TASK() {
    result = NULL;
    wup = NULL;
    app_version = NULL;
    pid = 0;
    slot = 0;
    task_state = PROCESS_UNINITIALIZED;
    scheduler_state = CPU_SCHED_UNINITIALIZED;
    signal = 0;
    strcpy(slot_dir, "");
    is_ss_app = false;
    graphics_mode_acked = MODE_UNSUPPORTED;
    graphics_mode_before_ss = MODE_HIDE_GRAPHICS;
    graphics_mode_ack_timeout = 0;
    exit_requested = false;
    fraction_done = 0;
    episode_start_cpu_time = 0;
    run_interval_start_wall_time = gstate.now;
    debt_interval_start_cpu_time = 0;
    checkpoint_cpu_time = 0;
    checkpoint_wall_time = 0;
    current_cpu_time = 0;
    have_trickle_down = false;
    send_upload_file_status = false;
    pending_suspend_via_quit = false;
    too_large = false;
    want_network = 0;
    memset(&procinfo, 0, sizeof(procinfo));
#ifdef _WIN32
    pid_handle = 0;
    thread_handle = 0;
    shm_handle = 0;
#endif
}

#ifdef _WIN32

// call this when a process has existed but will be started again
// (e.g. suspend via quit, exited but no finish file).
// In these cases we want to keep the shmem and events
//
void ACTIVE_TASK::close_process_handles() {
    if (pid_handle) {
        CloseHandle(pid_handle);
        pid_handle = NULL;
    }
    if (thread_handle) {
        CloseHandle(thread_handle);
        thread_handle = NULL;
    }
}
#endif

// call this when a process has exited and we're not going to restart it
//
void ACTIVE_TASK::cleanup_task() {
#ifdef _WIN32
    close_process_handles();

    // detach from shared mem.
    // This will destroy shmem seg since we're the last attachment
    //
    if (app_client_shm.shm) {
        detach_shmem(shm_handle, app_client_shm.shm);
        app_client_shm.shm = NULL;
    }
#else
    int retval;

    if (app_client_shm.shm) {
        retval = detach_shmem(app_client_shm.shm);
        if (retval) {
            msg_printf(NULL, MSG_ERROR,
                "Couldn't detach shared memory: %s", boincerror(retval)
            );
        }
        retval = destroy_shmem(shmem_seg_name);
        if (retval) {
            msg_printf(NULL, MSG_ERROR,
                "Couldn't destroy shared memory: %s", boincerror(retval)
            );
        }
        app_client_shm.shm = NULL;
    }
#endif
}

ACTIVE_TASK::~ACTIVE_TASK() {
    cleanup_task();
}

int ACTIVE_TASK::init(RESULT* rp) {
    result = rp;
    wup = rp->wup;
    app_version = wup->avp;
    max_cpu_time = rp->wup->rsc_fpops_bound/gstate.host_info.p_fpops;
    max_disk_usage = rp->wup->rsc_disk_bound;
    max_mem_usage = rp->wup->rsc_memory_bound;
    strcpy(process_control_queue.name, rp->name);
    strcpy(graphics_request_queue.name, rp->name);
    get_slot_dir(slot, slot_dir);
    return 0;
}

#if 0
// Deallocate memory to prevent unneeded reporting of memory leaks
//
void ACTIVE_TASK_SET::free_mem() {
    vector<ACTIVE_TASK*>::iterator at_iter;
    ACTIVE_TASK *at;

    at_iter = active_tasks.begin();
    while (at_iter != active_tasks.end()) {
        at = active_tasks[0];
        at_iter = active_tasks.erase(at_iter);
        delete at;
    }
}
#endif

void ACTIVE_TASK_SET::get_memory_usage() {
    static double last_mem_time=0;
    unsigned int i;
	int retval;

    double diff = gstate.now - last_mem_time;
    if (diff < 10) return;

    last_mem_time = gstate.now;
    vector<PROCINFO> piv;
    retval = procinfo_setup(piv);
	if (retval) {
		if (log_flags.mem_usage_debug) {
			msg_printf(0, MSG_ERROR,
				"[mem_usage_debug] procinfo_setup() returned %d", retval
			);
		}
		return;
	}
    for (i=0; i<active_tasks.size(); i++) {
        ACTIVE_TASK* atp = active_tasks[i];
        if (atp->task_state == PROCESS_EXECUTING) {
            PROCINFO& pi = atp->procinfo;
            unsigned long last_page_fault_count = pi.page_fault_count;
            memset(&pi, 0, sizeof(pi));
            pi.id = atp->pid;
            procinfo_app(pi, piv);
            pi.working_set_size_smoothed = .5*pi.working_set_size_smoothed + pi.working_set_size;

            int pf = pi.page_fault_count - last_page_fault_count;
            pi.page_fault_rate = pf/diff;
            if (log_flags.mem_usage_debug) {
                msg_printf(atp->result->project, MSG_INFO,
                    "[mem_usage_debug] %s: RAM %.2fMB, page %.2fMB, %.2f page faults/sec, user CPU %.3f, kernel CPU %.3f",
                    atp->result->name,
                    pi.working_set_size/MEGA, pi.swap_size/MEGA,
                    pi.page_fault_rate,
                    pi.user_time, pi.kernel_time
                );
            }
        }
    }

#if 0
    // the following is not useful because most OSs don't
    // move idle processes out of RAM, so physical memory is always full
    //
    procinfo_other(pi, piv);
    msg_printf(NULL, MSG_INFO, "All others: RAM %.2fMB, page %.2fMB, user %.3f, kernel %.3f",
        pi.working_set_size/MEGA, pi.swap_size/MEGA,
        pi.user_time, pi.kernel_time
    );
#endif
}

// Do periodic checks on running apps:
// - get latest CPU time and % done info
// - check if any has exited, and clean up
// - see if any has exceeded its CPU or disk space limits, and abort it
//
bool ACTIVE_TASK_SET::poll() {
    bool action;
    unsigned int i;
    static double last_time = 0;
    if (gstate.now - last_time < 1.0) return false;
    last_time = gstate.now;

    action = check_app_exited();
    send_heartbeats();
    send_trickle_downs();
    graphics_poll();
    process_control_poll();
    get_memory_usage();
    action |= check_rsc_limits_exceeded();
    action |= get_msgs();
    for (i=0; i<active_tasks.size(); i++) {
        ACTIVE_TASK* atp = active_tasks[i];
        if (atp->task_state == PROCESS_ABORT_PENDING) {
            if (gstate.now > atp->abort_time + 5.0) {
                atp->kill_task();
                atp->task_state = PROCESS_ABORTED;
            }
        }
    }

    if (action) {
        gstate.set_client_state_dirty("ACTIVE_TASK_SET::poll");
    }

    return action;
}

// Remove an ACTIVE_TASK from the set.
// Does NOT delete the ACTIVE_TASK object.
//
int ACTIVE_TASK_SET::remove(ACTIVE_TASK* atp) {
    vector<ACTIVE_TASK*>::iterator iter;

    iter = active_tasks.begin();
    while (iter != active_tasks.end()) {
        if (*iter == atp) {
            iter = active_tasks.erase(iter);
            return 0;
        }
        iter++;
    }
    msg_printf(NULL, MSG_ERROR, "Task %s not found", atp->result->name);
    return ERR_NOT_FOUND;
}

// There's a new trickle file.
// Move it from slot dir to project dir
//
int ACTIVE_TASK::move_trickle_file() {
    char project_dir[256], new_path[256], old_path[256];
    int retval;

    get_project_dir(result->project, project_dir);
    sprintf(old_path, "%s/trickle_up.xml", slot_dir);
    sprintf(new_path,
        "%s/trickle_up_%s_%d.xml",
        project_dir, result->name, (int)time(0)
    );
    retval = boinc_rename(old_path, new_path);

    // if can't move it, remove
    //
    if (retval) {
        boinc_delete_file(old_path);
        return ERR_RENAME;
    }
    return 0;
}

// Returns the estimated CPU time to completion (in seconds) of this task.
// Compute this as a weighted average of estimates based on
// 1) the workunit's flops count
// 2) the current reported CPU time and fraction done
//
double ACTIVE_TASK::est_cpu_time_to_completion() {
    if (fraction_done >= 1) return 0;
    double wu_est = result->estimated_cpu_time();
    if (fraction_done <= 0) return wu_est;
    double frac_est = (current_cpu_time / fraction_done) - current_cpu_time;
    double fraction_left = 1-fraction_done;
    return fraction_done*frac_est + fraction_left*fraction_left*wu_est;
}

// size of output files and files in slot dir
//
int ACTIVE_TASK::current_disk_usage(double& size) {
    double x;
    unsigned int i;
    int retval;
    FILE_INFO* fip;
    char path[256];

    retval = dir_size(slot_dir, size);
    if (retval) return retval;
    for (i=0; i<result->output_files.size(); i++) {
        fip = result->output_files[i].file_info;
        get_pathname(fip, path);
        retval = file_size(path, x);
        if (!retval) size += x;
    }
    return 0;
}

// Get the next free slot
//
int ACTIVE_TASK_SET::get_free_slot() {
    unsigned int i;
    int j;
    bool found;

    for (j=0; ; j++) {
        found = false;
        for (i=0; i<active_tasks.size(); i++) {
            if (active_tasks[i]->slot == j) {
                found = true;
                break;
            }
        }
        if (!found) return j;
    }
    return ERR_NOT_FOUND;   // probably never get here
}

bool ACTIVE_TASK_SET::slot_taken(int slot) {
    unsigned int i;
    for (i=0; i<active_tasks.size(); i++) {
        if (active_tasks[i]->slot == slot) return true;
    }
    return false;
}

int ACTIVE_TASK::write(MIOFILE& fout) {
    fout.printf(
        "<active_task>\n"
        "    <project_master_url>%s</project_master_url>\n"
        "    <result_name>%s</result_name>\n"
        "    <active_task_state>%d</active_task_state>\n"
        "    <app_version_num>%d</app_version_num>\n"
        "    <slot>%d</slot>\n"
        "    <scheduler_state>%d</scheduler_state>\n"
        "    <checkpoint_cpu_time>%f</checkpoint_cpu_time>\n"
        "    <fraction_done>%f</fraction_done>\n"
        "    <current_cpu_time>%f</current_cpu_time>\n"
        "    <swap_size>%f</swap_size>\n"
        "    <working_set_size>%f</working_set_size>\n"
        "    <working_set_size_smoothed>%f</working_set_size_smoothed>\n"
        "    <page_fault_rate>%f</page_fault_rate>\n"
        "%s",
        result->project->master_url,
        result->name,
        task_state,
        app_version->version_num,
        slot,
        scheduler_state,
        checkpoint_cpu_time,
        fraction_done,
        current_cpu_time,
        procinfo.swap_size,
        procinfo.working_set_size,
        procinfo.working_set_size_smoothed,
        procinfo.page_fault_rate,
        too_large?"   <too_large/>\n":""
    );
    if (supports_graphics() && !gstate.disable_graphics) {
        fout.printf(
            "   <supports_graphics/>\n"
            "   <graphics_mode_acked>%d</graphics_mode_acked>\n",
            graphics_mode_acked
        );
    }
    fout.printf("</active_task>\n");
    return 0;
}

int ACTIVE_TASK::parse(MIOFILE& fin) {
    char buf[256], result_name[256], project_master_url[256];
    int app_version_num=0, n;
    unsigned int i;
    PROJECT* project;

    strcpy(result_name, "");
    strcpy(project_master_url, "");

    while (fin.fgets(buf, 256)) {
        if (match_tag(buf, "</active_task>")) {
            project = gstate.lookup_project(project_master_url);
            if (!project) {
                msg_printf(
                    NULL, MSG_ERROR,
                    "State file error: project %s not found\n",
                    project_master_url
                );
                return ERR_NULL;
            }
            result = gstate.lookup_result(project, result_name);
            if (!result) {
                msg_printf(
                    project, MSG_ERROR,
                    "State file error: result %s not found\n",
                    result_name
                );
                return ERR_NULL;
            }

            // various sanity checks
            //
            if (result->got_server_ack
                || result->ready_to_report
                || result->state != RESULT_FILES_DOWNLOADED
            ) {
                msg_printf(project, MSG_ERROR,
                    "State file error: result %s is in wrong state\n",
                    result_name
                );
                return ERR_BAD_RESULT_STATE;
            }

            wup = result->wup;
            app_version = gstate.lookup_app_version(
                result->app, app_version_num
            );
            if (!app_version) {
                msg_printf(
                    project, MSG_ERROR,
                    "State file error: application %s version %d not found\n",
                    result->app->name, app_version_num
                );
                return ERR_NULL;
            }

            // make sure no two active tasks are in same slot
            //
            for (i=0; i<gstate.active_tasks.active_tasks.size(); i++) {
                ACTIVE_TASK* atp = gstate.active_tasks.active_tasks[i];
                if (atp->slot == slot) {
                    msg_printf(project, MSG_ERROR,
                        "State file error: two tasks in slot %d\n", slot
                    );
                    return ERR_BAD_RESULT_STATE;
                }
            }
            return 0;
        }
        else if (parse_str(buf, "<result_name>", result_name, sizeof(result_name))) continue;
        else if (parse_str(buf, "<project_master_url>", project_master_url, sizeof(project_master_url))) continue;
        else if (parse_int(buf, "<app_version_num>", app_version_num)) continue;
        else if (parse_int(buf, "<slot>", slot)) continue;
        else if (parse_double(buf, "<checkpoint_cpu_time>", checkpoint_cpu_time)) continue;
        else if (parse_double(buf, "<fraction_done>", fraction_done)) continue;
        else if (parse_double(buf, "<current_cpu_time>", current_cpu_time)) continue;
        else if (parse_int(buf, "<active_task_state>", n)) continue;
        else if (parse_double(buf, "<swap_size>", procinfo.swap_size)) continue;
        else if (parse_double(buf, "<working_set_size>", procinfo.working_set_size)) continue;
        else if (parse_double(buf, "<working_set_size_smoothed>", procinfo.working_set_size_smoothed)) continue;
        else if (parse_double(buf, "<page_fault_rate>", procinfo.page_fault_rate)) continue;
        else if (match_tag(buf, "<supports_graphics/>")) continue;
        else if (parse_int(buf, "<graphics_mode_acked>", n)) continue;
        else if (parse_int(buf, "<scheduler_state>", n)) continue;
        else {
            if (log_flags.unparsed_xml) {
                msg_printf(0, MSG_ERROR,
                    "[unparsed_xml] ACTIVE_TASK::parse(): unrecognized %s\n", buf
                );
            }
        }
    }
    return ERR_XML_PARSE;
}

// Write XML information about this active task set
//
int ACTIVE_TASK_SET::write(MIOFILE& fout) {
    unsigned int i;
    int retval;

    fout.printf("<active_task_set>\n");
    for (i=0; i<active_tasks.size(); i++) {
        retval = active_tasks[i]->write(fout);
        if (retval) return retval;
    }
    fout.printf("</active_task_set>\n");
    return 0;
}

// Parse XML information about an active task set
//
int ACTIVE_TASK_SET::parse(MIOFILE& fin) {
    ACTIVE_TASK* atp;
    char buf[256];
    int retval;

    while (fin.fgets(buf, 256)) {
        if (match_tag(buf, "</active_task_set>")) return 0;
        else if (match_tag(buf, "<active_task>")) {
            atp = new ACTIVE_TASK;
            retval = atp->parse(fin);
            if (!retval) {
                if (slot_taken(atp->slot)) {
                    msg_printf(atp->result->project, MSG_ERROR,
                        "slot %d in use; discarding result %s",
                        atp->slot, atp->result->name
                    );
                    retval = ERR_XML_PARSE;
                }
            }
            if (!retval) active_tasks.push_back(atp);
            else delete atp;
        } else {
            if (log_flags.unparsed_xml) {
                msg_printf(0, MSG_ERROR,
                    "[unparsed_xml] ACTIVE_TASK_SET::parse(): unrecognized %s\n", buf
                );
            }
        }
    }
    return ERR_XML_PARSE;
}

void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) {
    if ((msgs.size()==0) && channel.send_msg(msg)) {
		if (log_flags.app_msg_send) {
            msg_printf(NULL, MSG_INFO, "[app_msg_send] sent %s to %s", msg, name);
		}
        return;
    }
	if (log_flags.app_msg_send) {
        msg_printf(NULL, MSG_INFO, "[app_msg_send] deferred %s to %s", msg, name);
	}
    msgs.push_back(std::string(msg));
}

void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) {
    if (msgs.size() > 0) {
		if (log_flags.app_msg_send) {
			msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: %d msgs queued", (int)msgs.size());
		}
        if (channel.send_msg(msgs[0].c_str())) {
			if (log_flags.app_msg_send) {
				msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: delayed sent %s to %s", (msgs[0].c_str()), name);
			}
            msgs.erase(msgs.begin());
		} else {
			if (log_flags.app_msg_send) {
				msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: still deferred: %s to %s", (msgs[0].c_str()), name);
			}
		}
    }
}

// delete any queued messages with the given string
//
int MSG_QUEUE::msg_queue_purge(const char* msg) {
	vector<string>::iterator iter = msgs.begin();
	int count = 0;
	while (iter != msgs.end()) {
		if (!strcmp(msg, iter->c_str())) {
			if (log_flags.app_msg_send) {
				msg_printf(NULL, MSG_INFO, "[app_msg_send] purged %s", msg);
			}
			iter = msgs.erase(iter);
			count++;
		} else {
			iter++;
		}
	}
	return count;
}


void ACTIVE_TASK_SET::report_overdue() {
    unsigned int i;
    ACTIVE_TASK* atp;

    for (i=0; i<active_tasks.size(); i++) {
        atp = active_tasks[i];
        double diff = (gstate.now - atp->result->report_deadline)/86400;
        if (diff > 0) {
            msg_printf(atp->result->project, MSG_ERROR,
                "Task %s is %.2f days overdue.", atp->result->name, diff
            );
            msg_printf(atp->result->project, MSG_ERROR,
                "You may not get credit for it.  Consider aborting it."
            );
        }
    }
}

// scan the slot directory, looking for files with names
// of the form boinc_ufr_X.
// Then mark file X as being present (and uploadable)
//
int ACTIVE_TASK::handle_upload_files() {
    std::string filename;
    char buf[256], path[256];
    int retval;

    DirScanner dirscan(slot_dir);
    while (dirscan.scan(filename)) {
        strcpy(buf, filename.c_str());
        if (strstr(buf, UPLOAD_FILE_REQ_PREFIX) == buf) {
            char* p = buf+strlen(UPLOAD_FILE_REQ_PREFIX);
            FILE_INFO* fip = result->lookup_file_logical(p);
            if (fip) {
                get_pathname(fip, path);
                retval = md5_file(path, fip->md5_cksum, fip->nbytes);
                if (retval) {
                    fip->status = retval;
                } else {
                    fip->status = FILE_PRESENT;
                }
            } else {
                msg_printf(0, MSG_ERROR, "Can't find uploadable file %s", p);
            }
            sprintf(path, "%s/%s", slot_dir, buf);
            boinc_delete_file(path);
        }
    }
    return 0;
}

void ACTIVE_TASK_SET::handle_upload_files() {
    for (unsigned int i=0; i<active_tasks.size(); i++) {
        ACTIVE_TASK* atp = active_tasks[i];
        atp->handle_upload_files();
    }
}

bool ACTIVE_TASK_SET::want_network() {
    for (unsigned int i=0; i<active_tasks.size(); i++) {
        ACTIVE_TASK* atp = active_tasks[i];
        if (atp->want_network) return true;
    }
    return false;
}

void ACTIVE_TASK_SET::network_available() {
    for (unsigned int i=0; i<active_tasks.size(); i++) {
        ACTIVE_TASK* atp = active_tasks[i];
        if (atp->want_network) {
            atp->send_network_available();
        }
    }
}

void ACTIVE_TASK::upload_notify_app(const FILE_INFO* fip, const FILE_REF* frp) {
    char path[256];
    sprintf(path, "%s/%s%s", slot_dir, UPLOAD_FILE_STATUS_PREFIX, frp->open_name);
    FILE* f = boinc_fopen(path, "w");
    if (!f) return;
    fprintf(f, "<status>%d</status>\n", fip->status);
    fclose(f);
    send_upload_file_status = true;
}

// a file upload has finished.
// If any running apps are waiting for it, notify them
//
void ACTIVE_TASK_SET::upload_notify_app(FILE_INFO* fip) {
    for (unsigned int i=0; i<active_tasks.size(); i++) {
        ACTIVE_TASK* atp = active_tasks[i];
        RESULT* rp = atp->result;
        FILE_REF* frp = rp->lookup_file(fip);
        if (frp) {
            atp->upload_notify_app(fip, frp);
        }
    }
}

void ACTIVE_TASK_SET::init() {
    for (unsigned int i=0; i<active_tasks.size(); i++) {
        ACTIVE_TASK* atp = active_tasks[i];
        atp->init(atp->result);
        atp->scheduler_state = CPU_SCHED_PREEMPTED;
    }
}

const char *BOINC_RCSID_778b61195e = "$Id: app.C,v 1.329 2006/11/21 12:21:45 charlief Exp $";
